package m191tom192

import (
	"context"

	"github.com/pkg/errors"
	newSchema "github.com/stackrox/rox/migrator/migrations/m_191_to_m_192_vulnerability_requests_searchable_scope/schema/new"
	"github.com/stackrox/rox/migrator/types"
	"github.com/stackrox/rox/pkg/logging"
	"github.com/stackrox/rox/pkg/postgres/pgutils"
	"github.com/stackrox/rox/pkg/sac"
	"gorm.io/gorm"
	"gorm.io/gorm/clause"
)

var (
	batchSize = 2000
	log       = logging.LoggerForModule()
)

func migrate(database *types.Databases) error {
	ctx := sac.WithAllAccess(context.Background())
	db := database.GormDB
	pgutils.CreateTableFromModel(ctx, db, newSchema.CreateTableVulnerabilityRequestsStmt)

	return populateScopeColumns(ctx, db)
}

func populateScopeColumns(ctx context.Context, database *gorm.DB) error {
	db := database.WithContext(ctx).Table(newSchema.VulnerabilityRequestsTableName)
	query := database.WithContext(ctx).Table(newSchema.VulnerabilityRequestsTableName).Select("serialized")

	rows, err := query.Rows()
	if err != nil {
		return errors.Wrapf(err, "failed to iterate table %s", newSchema.VulnerabilityRequestsTableName)
	}
	defer func() { _ = rows.Close() }()

	var updatedVulnReqs []*newSchema.VulnerabilityRequests
	var count int

	for rows.Next() {
		var obj newSchema.VulnerabilityRequests
		if err = query.ScanRows(rows, &obj); err != nil {
			return errors.Wrap(err, "failed to scan rows")
		}
		proto, err := newSchema.ConvertVulnerabilityRequestToProto(&obj)
		if err != nil {
			return errors.Wrapf(err, "failed to convert %+v to proto", obj)
		}

		if proto.GetScope() == nil || proto.GetScope().GetImageScope() == nil {
			continue
		}

		converted, err := newSchema.ConvertVulnerabilityRequestFromProto(proto)
		if err != nil {
			return errors.Wrapf(err, "failed to convert from proto %+v", proto)
		}

		updatedVulnReqs = append(updatedVulnReqs, converted)
		count++
		if len(updatedVulnReqs) == batchSize {
			if err = db.
				Clauses(clause.OnConflict{UpdateAll: true}).
				Model(newSchema.CreateTableVulnerabilityRequestsStmt.GormModel).
				Create(&updatedVulnReqs).Error; err != nil {
				return errors.Wrapf(err, "failed to upsert converted %d objects after %d upserted", len(updatedVulnReqs), count-len(updatedVulnReqs))
			}
			updatedVulnReqs = updatedVulnReqs[:0]
		}
	}
	if rows.Err() != nil {
		return errors.Wrapf(rows.Err(), "failed to get rows for %s", newSchema.VulnerabilityRequestsTableName)
	}
	if len(updatedVulnReqs) > 0 {
		if err = db.
			Clauses(clause.OnConflict{UpdateAll: true}).
			Model(newSchema.CreateTableVulnerabilityRequestsStmt.GormModel).
			Create(&updatedVulnReqs).Error; err != nil {
			return errors.Wrapf(err, "failed to upsert last %d objects", len(updatedVulnReqs))
		}
	}
	log.Infof("Populated scope columns for %d vulnerability requests", count)
	return nil
}
